Source code for hysop.backend.host.python.operator.penalization

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from hysop.constants import PenalizationFormulation
from hysop.backend.host.host_operator import HostOperator
from hysop.tools.htypes import check_instance, InstanceOf
from hysop.tools.decorators import debug
from hysop.fields.continuous_field import Field
from hysop.parameters.scalar_parameter import ScalarParameter
from hysop.parameters.tensor_parameter import TensorParameter
from hysop.topology.cartesian_descriptor import CartesianTopologyDescriptors
from hysop.methods import SpaceDiscretization
from hysop.core.memory.memory_request import MemoryRequest
from hysop.core.graph.graph import op_apply
from hysop.numerics.stencil.stencil_generator import (
    StencilGenerator,
    CenteredStencilGenerator,
    MPQ,
)
import numpy as np


[docs] class CommonPenalization: """ """ __default_method = {SpaceDiscretization: SpaceDiscretization.FDC4} __available_methods = { SpaceDiscretization: (InstanceOf(SpaceDiscretization), InstanceOf(int)) }
[docs] @classmethod def default_method(cls): dm = super().default_method() dm.update(cls.__default_method) return dm
[docs] @classmethod def available_methods(cls): am = super().available_methods() am.update(cls.__available_methods) return am
@debug def __new__(cls, **kwds): return super().__new__(cls, **kwds) @debug def __init__( self, obstacles, velocity, dt, coeff=None, ubar=None, formulation=None, **kwds ): super().__init__(**kwds) check_instance(dt, ScalarParameter) check_instance( coeff, (ScalarParameter, float, type(lambda x: x)), allow_none=True ) check_instance(ubar, (TensorParameter, Field), allow_none=True) check_instance(formulation, PenalizationFormulation, allow_none=True) check_instance( obstacles, (tuple, dict), values=Field, keys=(ScalarParameter, float, type(lambda x: x)), check_kwds=False, ) if isinstance(coeff, ScalarParameter): self.coeff = lambda o: coeff() * o elif isinstance(coeff, type(lambda x: x)): self.coeff = coeff elif not coeff is None: c = ScalarParameter("penal_coeff", initial_value=coeff) self.coeff = lambda o: c() * o if isinstance(obstacles, dict): obs = {} for c, o in obstacles.items(): if isinstance(c, ScalarParameter): obs[lambda x: c() * x] = o elif isinstance(c, type(lambda x: x)): obs[c] = o elif not c is None: _ = ScalarParameter("penal_coeff", initial_value=c) obs[lambda x: _() * x] = o else: obs = {} for o in obstacles: obs[self.coeff] = o for o in obs.values(): assert o.nb_components == 1 self._ubar = ubar if ubar is None: self._ubar = TensorParameter( name="ubar", shape=(velocity.dim,), quiet=True, dtype=velocity.dtype, initial_value=(0.0,) * velocity.dim, ) if formulation is None or formulation is PenalizationFormulation.IMPLICIT: self._compute_penalization = self._compute_penalization_implicit else: self._compute_penalization = self._compute_penalization_exact self.obstacles = obs def _compute_penalization_implicit(self): dt = self.dt() self.tmp[...] = 0.0 for c, o in self.dobstacles.items(): self.tmp[...] += (-dt) * c(o) / (1.0 + dt * c(o)) def _compute_penalization_exact(self): dt = self.dt() self.tmp[...] = 1.0 for c, o in self.dobstacles.items(): self.tmp[...] *= np.exp(-dt * c(o)) self.tmp[...] -= 1.0
[docs] class PythonPenalizeVorticity(HostOperator, CommonPenalization): """ """ __default_method = {SpaceDiscretization: SpaceDiscretization.FDC4} __available_methods = { SpaceDiscretization: (InstanceOf(SpaceDiscretization), InstanceOf(int)) }
[docs] @classmethod def default_method(cls): dm = super().default_method() dm.update(cls.__default_method) return dm
[docs] @classmethod def available_methods(cls): am = super().available_methods() am.update(cls.__available_methods) return am
@debug def __new__( cls, obstacles, variables, velocity, vorticity, dt, coeff=None, ubar=None, formulation=None, **kwds, ): return super().__new__( cls, input_fields=None, output_fields=None, input_params=None, **kwds ) @debug def __init__( self, obstacles, variables, velocity, vorticity, dt, coeff=None, ubar=None, formulation=None, **kwds, ): check_instance(velocity, Field) check_instance(vorticity, Field) check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors) input_fields = {velocity: variables[velocity], vorticity: variables[vorticity]} output_fields = {vorticity: variables[vorticity]} input_params = {dt} for o in obstacles.values() if isinstance(obstacles, dict) else obstacles: input_fields[o] = variables[o] if isinstance(ubar, Field): input_fields[ubar] = variables[ubar] self.velocity = velocity self.vorticity = vorticity self.dt = dt super().__init__( input_fields=input_fields, output_fields=output_fields, input_params=input_params, obstacles=obstacles, velocity=velocity, dt=dt, coeff=coeff, ubar=ubar, formulation=formulation, **kwds, )
[docs] @debug def handle_method(self, method): super().handle_method(method) self.space_discretization = method.pop(SpaceDiscretization) csg = CenteredStencilGenerator() csg.configure(dtype=MPQ, dim=1) try: self.order = int(str(self.space_discretization)[3:]) except: self.order = self.space_discretization self.stencil = tuple( csg.generate_exact_stencil(derivative=1, order=self.order) for _ in range(3) )
[docs] @debug def get_field_requirements(self): requirements = super().get_field_requirements() stencil = self.stencil[0] G = max(max(a, b) for a, b in zip(stencil.L, stencil.R)) for is_input, (field, td, req) in requirements.iter_requirements(): min_ghosts = (max(g, G) for g in req.min_ghosts.copy()) req.min_ghosts = min_ghosts req.axes = (tuple(range(field.dim)),) return requirements
[docs] @debug def discretize(self): if self.discretized: return super().discretize() dim = self.velocity.dim self.dvelocity = self.input_discrete_tensor_fields[self.velocity] if dim == 2: self.dvorticity = self.input_discrete_fields[self.vorticity] if dim == 3: self.dvorticity = self.input_discrete_tensor_fields[self.vorticity] dv, dw = self.dvelocity, self.dvorticity stencil = self.stencil[0] G = max(max(a, b) for a, b in zip(stencil.L, stencil.R)) view = dv.local_slices(ghosts=(G,) * dim) V = tuple(Vi[view] for Vi in dv.buffers) view = dw.local_slices(ghosts=(G,) * dim) W = tuple(Wi[view] for Wi in dw.buffers) self.W, self.V = W, V self.dobstacles = {} for c, o in self.obstacles.items(): o_df = self.input_discrete_fields[o] self.dobstacles[c] = o_df.data[0][o_df.local_slices(ghosts=(G,) * dim)] for s, dx in zip(self.stencil, dw.space_step): s.replace_symbols({s.dx: dx}) self.w_ghost_exchanger = dw.build_ghost_exchanger() self.v_ghost_exchanger = dv.build_ghost_exchanger() if isinstance(self._ubar, TensorParameter): self.get_ubar = lambda: self._ubar()[::-1] else: dubar = self.input_discrete_tensor_fields[self._ubar] view = dubar.local_slices(ghosts=(G,) * dim) uBar = tuple(Vi[view] for Vi in dubar.buffers) self.get_ubar = lambda: uBar
[docs] @debug def get_work_properties(self): requests = super().get_work_properties() buffers = MemoryRequest.empty_like( a=self.V[0], nb_components=4, backend=self.dvelocity.backend ) requests.push_mem_request("Wtmp", buffers) return requests
[docs] def setup(self, work): super().setup(work=work) Wtmp = work.get_buffer(self, "Wtmp", handle=True) self.tmp_v = Wtmp[0:3] self.tmp = Wtmp[-1] if self.velocity.dim == 2: self._compute_vorticity = self._compute_vorticity_2d if self.velocity.dim == 3: self._compute_vorticity = self._compute_vorticity_3d
[docs] @classmethod def supported_dimensions(cls): return (3, 2)
[docs] @classmethod def supports_mpi(cls): return True
@op_apply def apply(self, **kwds): super().apply(**kwds) self.v_ghost_exchanger() # Penalize velocity self._compute_penalization() for ubar, v, tmp in zip(self.get_ubar(), self.V, self.tmp_v): tmp[...] = (v - ubar) * self.tmp self._compute_vorticity() self.w_ghost_exchanger() def _compute_vorticity_3d(self): # compute penalized vorticity: # (dvz/dy - dvy/dz) # W = (dvx/dz - dvz/dx) # (dvy/dx - dvx/dy) # X direction self.tmp = self.stencil[0](a=self.tmp_v[2], out=self.tmp, axis=2) self.W[1][...] += -self.tmp self.tmp = self.stencil[0](a=self.tmp_v[1], out=self.tmp, axis=2) self.W[2][...] += self.tmp # Y direction self.tmp = self.stencil[1](a=self.tmp_v[0], out=self.tmp, axis=1) self.W[2][...] += -self.tmp self.tmp = self.stencil[1](a=self.tmp_v[2], out=self.tmp, axis=1) self.W[0][...] += self.tmp # Z direction self.tmp = self.stencil[2](a=self.tmp_v[1], out=self.tmp, axis=0) self.W[0][...] += -self.tmp self.tmp = self.stencil[2](a=self.tmp_v[0], out=self.tmp, axis=0) self.W[1][...] += self.tmp def _compute_vorticity_2d(self): # compute penalized vorticity: # W = (dvy/dx - dvx/dy) # X direction self.tmp = self.stencil[0](a=self.tmp_v[1], out=self.tmp, axis=1) self.W[0][...] += self.tmp # Y direction self.tmp = self.stencil[1](a=self.tmp_v[0], out=self.tmp, axis=0) self.W[0][...] += -self.tmp
[docs] class PythonPenalizeVelocity(HostOperator, CommonPenalization): """ """ __default_method = {SpaceDiscretization: SpaceDiscretization.FDC4} __available_methods = { SpaceDiscretization: (InstanceOf(SpaceDiscretization), InstanceOf(int)) }
[docs] @classmethod def default_method(cls): dm = super().default_method() dm.update(cls.__default_method) return dm
[docs] @classmethod def available_methods(cls): am = super().available_methods() am.update(cls.__available_methods) return am
@debug def __new__( cls, obstacles, variables, velocity, dt, coeff=None, ubar=None, formulation=None, **kwds, ): return super().__new__( cls, input_fields=None, output_fields=None, input_params=None, **kwds ) @debug def __init__( self, obstacles, variables, velocity, dt, coeff=None, ubar=None, formulation=None, **kwds, ): check_instance(velocity, Field) check_instance(variables, dict, keys=Field, values=CartesianTopologyDescriptors) input_fields = { velocity: variables[velocity], } output_fields = {velocity: variables[velocity]} input_params = {dt} for o in obstacles.values() if isinstance(obstacles, dict) else obstacles: input_fields[o] = variables[o] if isinstance(ubar, Field): input_fields[ubar] = variables[ubar] self.velocity = velocity self.dt = dt super().__init__( input_fields=input_fields, output_fields=output_fields, input_params=input_params, obstacles=obstacles, velocity=velocity, dt=dt, coeff=coeff, ubar=ubar, formulation=formulation, **kwds, )
[docs] @debug def discretize(self): if self.discretized: return super().discretize() self.dvelocity = self.input_discrete_tensor_fields[self.velocity] dv = self.dvelocity self.dobstacles = {} for c, o in self.obstacles.items(): o_df = self.input_discrete_fields[o] self.dobstacles[c] = o_df.data[0] self.v_ghost_exchanger = dv.build_ghost_exchanger() if self.v_ghost_exchanger is None: def _dummy_func(): pass self.v_ghost_exchanger = _dummy_func if isinstance(self._ubar, TensorParameter): self.get_ubar = lambda: self._ubar()[::-1] else: dubar = self.input_discrete_tensor_fields[self._ubar] self.get_ubar = lambda: dubar.buffers
[docs] @debug def get_work_properties(self): requests = super().get_work_properties() buffers = MemoryRequest.empty_like( a=self.dvelocity.buffers[0], nb_components=1, backend=self.dvelocity.backend ) requests.push_mem_request("Wtmp", buffers) return requests
[docs] def setup(self, work): super().setup(work=work) Wtmp = work.get_buffer(self, "Wtmp", handle=True) self.tmp = Wtmp[-1]
[docs] @classmethod def supported_dimensions(cls): return (3, 2)
[docs] @classmethod def supports_mpi(cls): return True
@op_apply def apply(self, **kwds): super().apply(**kwds) # Penalize velocity self._compute_penalization() for ubar, v in zip(self.get_ubar(), self.dvelocity.buffers): v[...] += (v - ubar) * self.tmp self.v_ghost_exchanger()